package cn.saas.rxjava;

import android.os.Build;
import android.os.Bundle;
import android.text.TextUtils;
import android.util.Log;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;

import androidx.annotation.RequiresApi;
import androidx.appcompat.app.AppCompatActivity;
import androidx.recyclerview.widget.LinearLayoutManager;
import androidx.recyclerview.widget.RecyclerView;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Notification;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.BiConsumer;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.functions.BiPredicate;
import io.reactivex.rxjava3.functions.BooleanSupplier;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class MainActivity extends AppCompatActivity {

    private RecyclerView recyclerView;
    private List<String> mList = new ArrayList<>();

    private Observer observer = new Observer<Integer>() {
        // 以下步骤仅为展示一个完整demo，可以忽略
        // 3. 通过通过订阅（subscribe）连接观察者和被观察者
        // 4. 创建观察者 & 定义响应事件的行为
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("=onSubscribe=","开始采用subscribe连接");
        }

        @Override
        public void onNext(Integer integer) {
            Log.e("=onNext=","接收到了事件" + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.e("=onError=","对Error事件作出响应");
        }

        @Override
        public void onComplete() {
            Log.e("=onComplete=","对Complete事件作出响应");
        }
    };

    private Observer observerLong = new Observer<Long>() {
        // 以下步骤仅为展示一个完整demo，可以忽略
        // 3. 通过通过订阅（subscribe）连接观察者和被观察者
        // 4. 创建观察者 & 定义响应事件的行为
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("=onSubscribe=","开始采用subscribe连接");
        }

        @Override
        public void onNext(Long integer) {
            Log.e("=onNext=","接收到了事件" + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.e("=onError=","对Error事件作出响应");
        }

        @Override
        public void onComplete() {
            Log.e("=onComplete=","对Complete事件作出响应");
        }
    };
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mList.add("创建操作符");
        mList.add("create()");
        mList.add("just()");
        mList.add("fromArray()");
        mList.add("fromIterable()");
        mList.add("defer()");
        mList.add("timer()");
        mList.add("interval()");
        mList.add("intervalRange()");
        mList.add("range()");
        mList.add("rangeLong()");
        mList.add("变换操作符");
        mList.add("map()");
        mList.add("flatMap()");
        mList.add("concatMap()");
        mList.add("switchMap()");
        mList.add("buffer()");
        mList.add("布尔操作符");
        mList.add("all()");
        mList.add("takeWhile()");
        mList.add("skipWhile()");
        mList.add("takeUntil()");
        mList.add("skipUntil()");
        mList.add("SequenceEqual()");
        mList.add("contains()");
        mList.add("isEmpty()");
        mList.add("amb()");
        mList.add("defaultIfEmpty()");
        mList.add("过滤操作符");
        mList.add("filter()");
        mList.add("ofType()");
        mList.add("skip()");
        mList.add("skipLast()");
        mList.add("distinct()");
        mList.add("distinctUntilChanged()");
        mList.add("take()");
        mList.add("takeLast()");
        mList.add("throttleFirst()");
        mList.add("throttleLast()");
        mList.add("Sample()");
        mList.add("throttleWithTimeout()");
        mList.add("debounce()");
        mList.add("firstElement()");
        mList.add("lastElement()");
        mList.add("elementAt()");
        mList.add("elementAtOrError()");
        mList.add("组合/合并操作符");
        mList.add("concat()");
        mList.add("concatArray()");
        mList.add("merge()");
        mList.add("mergeArray()");
        mList.add("concatDelayError()");
        mList.add("mergeDelayError()");
        mList.add("zip()");
        mList.add("combineLatest()");
        mList.add("combineLatestDelayError()");
        mList.add("reduce()");
        mList.add("collect()");
        mList.add("startWith()");
        mList.add("startWithArray()");
        mList.add("count()");
        mList.add("功能性操作符");
        mList.add("subscribe()");
        mList.add("delay()");
        mList.add("do()");
        mList.add("onErrorReturn()");
        mList.add("onErrorResumeNext()");
        mList.add("onExceptionResumeNext()");
        mList.add("retry()");
        mList.add("retryUntil()");
        mList.add("retryWhen()");
        mList.add("repeat()");
        mList.add("repeatWhen()");
        mList.add("线程切换");
        mList.add("subscribeOn()");
        mList.add("observeOn()");
        recyclerView = findViewById(R.id.recycler);
        recyclerView.setLayoutManager(new LinearLayoutManager(this));
        recyclerView.setAdapter(new RecyclerView.Adapter() {
            @NonNull
            @Override
            public RecyclerView.ViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
                return new CustomViewHolder(LayoutInflater.from(getBaseContext()).inflate(android.R.layout.simple_list_item_1,parent,false));
            }

            @Override
            public void onBindViewHolder(@NonNull RecyclerView.ViewHolder holder, int position) {
                if(holder instanceof CustomViewHolder){
                    ((CustomViewHolder) holder).bind(mList.get(position));
                }
            }

            @Override
            public int getItemCount() {
                return mList.size();
            }

            class CustomViewHolder extends RecyclerView.ViewHolder{
                TextView textView;
                public CustomViewHolder(@NonNull View itemView) {
                    super(itemView);
                    textView = itemView.findViewById(android.R.id.text1);
                }

                public void bind(String str){
                    textView.setText(str);
                    itemView.setOnClickListener(new View.OnClickListener() {
                        @Override
                        public void onClick(View view) {
                            //创建操作符
                            if(TextUtils.equals(str,"create()")){
                                create();
                            };
                            if(TextUtils.equals(str,"just()")){
                                just();
                            };
                            if(TextUtils.equals(str,"fromArray()")){
                                fromArray();
                            };
                            if(TextUtils.equals(str,"fromIterable()")){
                                fromIterable();
                            };
                            if(TextUtils.equals(str,"defer()")){
//                                defer();
                            };
                            if(TextUtils.equals(str,"interval()")){
                                interval();
                            };
                            if(TextUtils.equals(str,"intervalRange()")){
                                intervalRange();
                            };
                            if(TextUtils.equals(str,"range()")){
                                range();
                            };
                            if(TextUtils.equals(str,"rangeLong()")){
                                rangeLong();
                            };
                            //变换操作符
                            if(TextUtils.equals(str,"map()")){
                                map();
                            };
                            if(TextUtils.equals(str,"flatMap()")){
                                flatMap();
                            };
                            if(TextUtils.equals(str,"concatMap()")){
                                concatMap();
                            };
                            if(TextUtils.equals(str,"switchMap()")){
                                switchMap();
                            };
                            if(TextUtils.equals(str,"buffer()")){
                                buffer();
                            };
                            //布尔操作符
                            if(TextUtils.equals(str,"all()")){
                                all();
                            };
                            if(TextUtils.equals(str,"takeWhile()")){
                                takeWhile();
                            };
                            if(TextUtils.equals(str,"skipWhile()")){
                                skipWhile();
                            };
                            if(TextUtils.equals(str,"takeUntil()")){
                                takeUntil();
                            };
                            if(TextUtils.equals(str,"skipUntil()")){
                                skipUntil();
                            };
                            if(TextUtils.equals(str,"SequenceEqual()")){
                                SequenceEqual();
                            };
                            if(TextUtils.equals(str,"contains()")){
                                contains();
                            };
                            if(TextUtils.equals(str,"isEmpty()")){
                                isEmpty();
                            };
                            if(TextUtils.equals(str,"amb()")){
                                amb();
                            };
                            if(TextUtils.equals(str,"defaultIfEmpty()")){
                                defaultIfEmpty();
                            };
                            //过滤操作符
                            if(TextUtils.equals(str,"filter()")){
                                filter();
                            };
                            if(TextUtils.equals(str,"ofType()")){
                                ofType();
                            };
                            if(TextUtils.equals(str,"skip()")){
                                skip();
                            };
                            if(TextUtils.equals(str,"skipLast()")){
                                skipLast();
                            };
                            if(TextUtils.equals(str,"distinct()")){
                                distinct();
                            };
                            if(TextUtils.equals(str,"distinctUntilChanged()")){
                                distinctUntilChanged();
                            };
                            if(TextUtils.equals(str,"take()")){
                                take();
                            };
                            if(TextUtils.equals(str,"takeLast()")){
                                takeLast();
                            };
                            if(TextUtils.equals(str,"throttleFirst()")){
                                throttleFirst();
                            };
                            if(TextUtils.equals(str,"throttleLast()")){
                                throttleLast();
                            };
                            if(TextUtils.equals(str,"Sample()")){
                                Sample();
                            };
                            if(TextUtils.equals(str,"throttleWithTimeout()")){
                                throttleWithTimeout();
                            };
                            if(TextUtils.equals(str,"debounce()")){
                                debounce();
                            };
                            if(TextUtils.equals(str,"firstElement()")){
                                firstElement();
                            };
                            if(TextUtils.equals(str,"lastElement()")){
                                lastElement();
                            };
                            if(TextUtils.equals(str,"elementAt()")){
                                elementAt();
                            };
                            if(TextUtils.equals(str,"elementAtOrError()")){
                                elementAtOrError();
                            };
                            //组合合并操作符
                            if(TextUtils.equals(str,"concat()")){
                                concat();
                            };
                            if(TextUtils.equals(str,"concatArray()")){
                                concatArray();
                            };
                            if(TextUtils.equals(str,"merge()")){
                                merge();
                            };
                            if(TextUtils.equals(str,"mergeArray()")){
                                mergeArray();
                            };
                            if(TextUtils.equals(str,"concatDelayError()")){
                                concatDelayError();
                            };
                            if(TextUtils.equals(str,"mergeDelayError()")){
                                mergeDelayError();
                            };
                            if(TextUtils.equals(str,"zip()")){
                                zip();
                            };
                            if(TextUtils.equals(str,"combineLatest()")){
                                combineLatest();
                            };
                            if(TextUtils.equals(str,"combineLatestDelayError()")){
                                combineLatestDelayError();
                            };
                            if(TextUtils.equals(str,"reduce()")){
                                reduce();
                            };
                            if(TextUtils.equals(str,"collect()")){
                                collect();
                            };
                            if(TextUtils.equals(str,"startWith()")){
                                startWith();
                            };
                            if(TextUtils.equals(str,"startWithArray()")){
                                startWithArray();
                            };
                            if(TextUtils.equals(str,"count()")){
                                count();
                            };
                            //功能性操作符
                            if(TextUtils.equals(str,"subscribe()")){
                                subscribe();
                            };
                            if(TextUtils.equals(str,"delay()")){
                                delay();
                            };
                            if(TextUtils.equals(str,"do()")){
                                dox();
                            };
                            if(TextUtils.equals(str,"onErrorReturn()")){
                                onErrorReturn();
                            };
                            if(TextUtils.equals(str,"onErrorResumeNext()")){
                                onErrorResumeNext();
                            };
                            if(TextUtils.equals(str,"onExceptionResumeNext()")){
                                onExceptionResumeNext();
                            };
                            if(TextUtils.equals(str,"retry()")){
                                retry();
                            };
                            if(TextUtils.equals(str,"retryUntil()")){
                                retryUntil();
                            };
                            if(TextUtils.equals(str,"retryWhen()")){
                                retryWhen();
                            };
                            if(TextUtils.equals(str,"repeat()")){
                                repeat();
                            };
                            if(TextUtils.equals(str,"repeatWhen()")){
                                repeatWhen();
                            };
                            //线程切换
                            if(TextUtils.equals(str,"subscribeOn()")){
                                subscribeOn();
                            };
                            if(TextUtils.equals(str,"observeOn()")){
                                observeOn();
                            };
                        }
                    });
                }

            }
        });
    }

    //额外一些方法 下列方法一般用于测试使用
    public void extra(){
        // 该方法创建的被观察者对象发送事件的特点：仅发送Complete事件，直接通知完成
        Observable observable1=Observable.empty();
        // 即观察者接收后会直接调用onCompleted()

        // 该方法创建的被观察者对象发送事件的特点：仅发送Error事件，直接通知异常
        // 可自定义异常
        Observable observable2=Observable.error(new RuntimeException());
        // 即观察者接收后会直接调用onError()

        // 该方法创建的被观察者对象发送事件的特点：不发送任何事件
        Observable observable3=Observable.never();
        // 即观察者接收后什么都不调用
    }

    //完整创建1个被观察者对象（Observable） 在具体使用时，一般采用 链式调用 来创建  Observable 被观察者
    public void create(){
        //1. 通过create()创建被观察者 Observable 对象
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 传入参数： OnSubscribe 对象
            // 当 Observable 被订阅时，OnSubscribe 的 call() 方法会自动被调用，即事件序列就会依照设定依次被触发
            // 即观察者会依次调用对应事件的复写方法从而响应事件
            // 从而实现由被观察者向观察者的事件传递 & 被观察者调用了观察者的回调方法 ，即观察者模式
            // 通过 ObservableEmitter类对象 产生 & 发送事件
            // 2. 在复写的subscribe（）里定义需要发送的事件
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // ObservableEmitter类介绍
                // a. 定义：事件发射器
                // b. 作用：定义需要发送的事件 & 向观察者发送事件
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            } // 至此，一个被观察者对象（Observable）就创建完毕
        }).subscribe(observer);
    }

    //1.快速创建1个被观察者对象（Observable）
    //2.发送事件的特点：直接发送 传入的事件
    public void just(){
        // 1. 创建时传入整型1、2,在创建后就会发送这些对象，相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
        Observable.just(1,2)
                // 至此，一个Observable对象创建完毕，以下步骤仅为展示一个完整demo，可以忽略
                // 2. 通过通过订阅（subscribe）连接观察者和被观察者
                // 3. 创建观察者 & 定义响应事件的行为
                .subscribe(observer);
    }

    //1.快速创建1个被观察者对象（Observable）
    //2.发送事件的特点：直接发送 传入的数组数据
    //Todo 3.若直接传递一个list集合进去，否则会直接把list当做一个数据元素发送  只能是数组数据
    public void fromArray(){
        // 1. 设置需要传入的数组
        Integer[] integers = {1,2,3};
        // 2. 创建被观察者对象（Observable）时传入数组 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
        Observable.fromArray(integers).subscribe(observer);
    }

    //1.快速创建1个被观察者对象（Observable）
    //2.发送事件的特点：直接发送 传入的集合List数据
    //Todo 3.此处是集合
    public void fromIterable(){
        // 1. 设置需要传入的集合
        List<Integer> mList = new ArrayList<>();
        mList.add(1);mList.add(2);mList.add(3);mList.add(4);
        // 2. 通过fromIterable()将集合中的对象 / 数据发送出去
        Observable.fromIterable(mList).subscribe(observer);
    }

    //直到有观察者（Observer ）订阅时，才动态创建被观察者对象（Observable） & 发送事件
    //第一次赋值
    Integer i = 10;
/*    public void defer(){
        // 2. 通过defer 定义被观察者对象
        // Todo 注：此时被观察者对象还没创建
        Observable observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });
        i = 5;
        // 注：此时，才会调用defer（）创建被观察者对象（Observable）
        observable.subscribe(observer);
    }*/

    //快速创建1个被观察者对象（Observable）
    //发送事件的特点：延迟指定时间后，发送1个数值0（Long类型）
    // 注：timer操作符默认运行在一个新线程上 也可自定义线程调度器（第3个参数）：timer(long,TimeUnit,Scheduler)
    public void timer(){
        //此处是Long类型 所以重新定义
        Observable.timer(5, TimeUnit.SECONDS).subscribe(observerLong);

    }

    //1.快速创建1个被观察者对象（Observable）
    //2.发送事件的特点：每隔指定时间 就发送 事件
    public void interval(){
        // 参数说明：
        // 参数1 = 第1次延迟时间；
        // 参数2 = 间隔时间数字；
        // 参数3 = 时间单位；
        Observable.interval(3,1,TimeUnit.SECONDS)
                // 该例子发送的事件序列特点：延迟3s后发送事件，每隔1秒产生1个数字（从0开始递增1，无限个）
                .subscribe(observerLong);
    }

    //快速创建1个被观察者对象（Observable）
    //发送事件的特点：每隔指定时间 就发送 事件，可指定发送的数据的数量
    public void intervalRange(){
        // 参数说明：
        // 参数1 = 事件序列起始点；
        // 参数2 = 事件数量；
        // 参数3 = 第1次事件延迟发送时间；
        // 参数4 = 间隔时间数字；
        // 参数5 = 时间单位
        Observable.intervalRange(2,10,3,2,TimeUnit.SECONDS)
                // 该例子发送的事件序列特点：
                // 1. 从2开始，一共发送10个事件；
                // 2. 第1次延迟3s发送，之后每隔2秒产生1个数字（从0开始递增1，无限个）
                .subscribe(observerLong);
    }

    //快速创建1个被观察者对象（Observable）
    //发送事件的特点：连续发送 1个事件序列，可指定范围
    //Todo 作用类似于intervalRange（），但区别在于：无延迟发送事件
    public void range(){
        Observable.range(2,10).subscribe(observer);
    }

    //类似于range（），区别在于该方法支持数据类型 = Long
    public void rangeLong(){
        Observable.rangeLong(2,10).subscribe(observerLong);
    }

    //对被观察者发送的每1个事件都通过 指定的函数 处理，从而变换成另外一种事件
    public void map(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 1. 被观察者发送事件 = 参数为整型 = 1、2、3
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        })
                // 2. 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换：整型变换成字符串类型
                // Todo 注意 单纯的类型转换 Integer 转 String 并不是Observable<String>
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer ;
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e("=accept=",s);
            }
        });
    };

    //将被观察者发送的事件序列进行 拆分 & 单独转换，再合并成一个新的事件序列，最后再进行发送
    //1.为事件序列中每个事件都创建一个 Observable 对象；
    //2.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象；
    //3.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象；
    //4.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者（Observer）
    //Todo 注意此处是无序 无序的将被观察者发送的整个事件序列进行变换
    public void flatMap(){
        Observable.just("A","B","C","D","E").flatMap(new Function<String, ObservableSource<String>>() {
            @NonNull
            @Override
            public ObservableSource<String> apply(@NonNull String s) throws Exception {
                return Observable.just(s).subscribeOn(Schedulers.newThread());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e("=accept=",s);
            }
        });
    };

    //与flatMap 一样，不同之处是 concatMap 是有序的
    public void concatMap(){
        Observable.just("A","B","C","D","E").concatMap(new Function<String, ObservableSource<String>>() {
            @NonNull
            @Override
            public ObservableSource<String> apply(@NonNull String s) throws Exception {
                return Observable.just(s).subscribeOn(Schedulers.newThread());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e("=accept=",s);
            }
        });
    };

    //与flatMap 一样 不同之处是 switchMap 在不同的线程中他只会保留最后一次的线程订阅
    public void switchMap(){
        Observable.just("A","B","C","D","E").switchMap(new Function<String, ObservableSource<String>>() {
            @NonNull
            @Override
            public ObservableSource<String> apply(@NonNull String s) throws Exception {
                return Observable.just(s).subscribeOn(Schedulers.newThread());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e("=accept=",s);
            }
        });
    };

    //定期从 被观察者（Observable）需要发送的事件中 获取一定数量的事件 & 放到缓存区中，最终发送
    public void buffer(){
        Observable.just(1,2,3,4,5)
                // 设置缓存区大小 & 步长
                // 缓存区大小 = 每次从被观察者中获取的事件数量
                // 步长 = 每次获取新事件的数量
                .buffer(3,1).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(List<Integer> stringList) {
                //
                Log.e("=onNext=", " 缓存区里的事件数量 = " +  stringList.size());
                for (Integer value : stringList) {
                    Log.e("=onNext=", " 事件 = " + value);
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.e("=onError=", "对Error事件作出响应" );
            }

            @Override
            public void onComplete() {
                Log.e("=onComplete=", "对Complete事件作出响应");
            }
        });
    };

    //判断发送的每项数据是否都满足 设置的函数条件 若满足，返回 true；否则，返回 false
    public void all(){
        Observable.just(1,2,3,4,5).all(new Predicate<Integer>() {
            @Override
            public boolean test(@NonNull Integer integer) throws Exception {
                return integer <= 3;
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(@NonNull Boolean aBoolean) throws Exception {
                Log.e("=accept=",aBoolean+"");
            }
        });
    };

    //判断发送的每项数据是否满足 设置函数条件 若发送的数据满足该条件，则发送该项数据；否则不发送
    //Todo 与 filter 不同点是第一项不满足直接终止
    public void takeWhile(){
        Observable.just(1,2,3,4,5).takeWhile(new Predicate<Integer>() {
            @Override
            public boolean test(@NonNull Integer integer) throws Exception {
                return integer <= 3;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("=accept=",integer+"");
            }
        });
    };

    //判断发送的每项数据是否满足 设置函数条件 发送的数据不满足该条件，则发送该项数据；否则不发送
    public void skipWhile(){
        Observable.just(1,2,3,4,5).skipWhile(new Predicate<Integer>() {
            @Override
            public boolean test(@NonNull Integer integer) throws Exception {
                return integer <= 3;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("=accept=",integer+"");
            }
        });
    };

    //执行条件满足时，停止发送事件
    public void takeUntil(){
        Observable.just(1,2,3,4,5).takeUntil(new Predicate<Integer>() {
            @Override
            public boolean test(@NonNull Integer integer) throws Exception {
                return integer <= 3;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("=accept=",integer+"");
            }
        });
    };

    //等到 skipUntil（） 传入的Observable开始接受发送数据
    public void skipUntil(){
        Observable.interval(1,TimeUnit.SECONDS).skipUntil(Observable.timer(3,TimeUnit.SECONDS)).subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long integer) throws Exception {
                Log.e("=accept=",integer+"");
            }
        });
    };

    //判定两个Observables需要发送的数据是否相同
    public void SequenceEqual(){
        Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3)).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(@NonNull Boolean aBoolean) throws Exception {
                Log.e("=accept=",aBoolean+"");
            }
        });
    };

    //判断发送的数据中是否包含指定数据
    public void contains(){
        Observable.just(1,2,3,4,5).contains(4).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(@NonNull Boolean aBoolean) throws Exception {
                Log.e("=accept=",aBoolean+"");
            }
        });
    };

    //判断发送的数据是否为空
    public void isEmpty(){
        Observable.just(1).isEmpty().subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(@NonNull Boolean aBoolean) throws Exception {
                Log.e("=accept=",aBoolean+"");
            }
        });
    };

    //当需要发送多个 Observable时，只发送 先发送数据的Observable的数据，而其余 Observable则被丢弃
    public void amb(){
        // 设置2个需要发送的Observable & 放入到集合中
        List<ObservableSource<Integer>> list= new ArrayList <>();
        // 第1个Observable延迟1秒发射数据
        list.add( Observable.just(1,2,3).delay(1,TimeUnit.SECONDS));
        // 第2个Observable正常发送数据
        list.add( Observable.just(4,5,6));

        // 一共需要发送2个Observable的数据
        // 但由于使用了amb（）,所以仅发送先发送数据的Observable
        // 即第二个（因为第1个延时了）
        Observable.amb(list).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("=accept=", "接收到了事件 "+integer);
            }
        });
    };

    //在不发送任何有效事件（ Next事件）、仅发送了 Complete 事件的前提下，发送一个默认值
    public void defaultIfEmpty(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onComplete();
            }
        }).defaultIfEmpty(10).subscribe(observer);
    };

    //过滤 特定条件的事件
    public void filter(){
        Observable.just(1,2,3,4,5).filter(new Predicate<Integer>() {
            @Override
            public boolean test(@NonNull Integer integer) throws Exception {
                return integer > 3;
            }
        }).subscribe(observer);
    };

    //过滤 特定数据类型的数据
    public void ofType(){
        Observable.just("1",2,"3",4).ofType(Integer.class).subscribe(observer);
    };

    //跳过某个事件 跳过正序的前x项
    public void skip(){
        Observable.just(1,2,3,4).skip(1).subscribe(observer);
    };

    //跳过某个事件 跳过正序的后x项
    public void skipLast(){
        Observable.just(1,2,3,4).skipLast(2).subscribe(observer);
    };

    //过滤事件序列中重复的事件
    public void distinct(){
        Observable.just(1,2,3,1,2).distinct().subscribe(observer);
    };
    //过滤事件序列中连续重复的事件
    public void distinctUntilChanged(){
        Observable.just(1,2,3,1,2,3,3,4,4 ).distinctUntilChanged().subscribe(observer);
    };

    //指定观察者最多能接收到的事件数量
    public void take(){
        Observable.just(1,2,3,4,5).take(2).subscribe(observer);
    };

    //指定观察者只能接收到被观察者发送的最后几个事件
    public void takeLast(){
        Observable.just(1,2,3,4,5).takeLast(2).subscribe(observer);
    };

    //在某段时间内，只发送该段时间内第1次事件
    public void throttleFirst(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2);
                Thread.sleep(400);
                e.onNext(3);
                Thread.sleep(300);
                e.onNext(4);
                Thread.sleep(300);
                e.onComplete();
            }
        }).throttleFirst(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(observer);//每1秒中采用数据
    };

    //在某段时间内，只发送该段时间内最后1次事件
    public void throttleLast(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2);
                Thread.sleep(400);
                e.onNext(3);
                Thread.sleep(300);
                e.onNext(4);
                Thread.sleep(300);
                e.onComplete();
            }
        }).throttleLast(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(observer);
    };

    //与 throttleLast 类似
    public void Sample(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2);
                Thread.sleep(400);
                e.onNext(3);
                Thread.sleep(300);
                e.onNext(4);
                Thread.sleep(300);
                e.onComplete();
            }
        }).sample(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(observer);
    };

    //发送数据事件时，若2次发送事件的间隔＜指定时间，就会丢弃前一次的数据，直到指定时间内都没有新数据发射时才会发送后一次的数据
    //Todo 保留时间间隔内最新的数据
    public void throttleWithTimeout(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2); // 1和2之间的间隔小于指定时间1s，所以前1次数据（1）会被抛弃，2会被保留
                Thread.sleep(1500);  // 因为2和3之间的间隔大于指定时间1s，所以之前被保留的2事件将发出
                e.onNext(3);
                Thread.sleep(1500);  // 因为3和4之间的间隔大于指定时间1s，所以3事件将发出
                e.onNext(4);
                Thread.sleep(500); // 因为4和5之间的间隔小于指定时间1s，所以前1次数据（4）会被抛弃，5会被保留
                e.onNext(5);
                Thread.sleep(500); // 因为5和6之间的间隔小于指定时间1s，所以前1次数据（5）会被抛弃，6会被保留
                e.onNext(6);
                Thread.sleep(1500); // 因为6和Complete实践之间的间隔大于指定时间1s，所以之前被保留的6事件将发出

                e.onComplete();
            }
        }).throttleWithTimeout(1, TimeUnit.SECONDS).subscribe(observer);//每1秒中采用数据
    };

    public void debounce(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2); // 1和2之间的间隔小于指定时间1s，所以前1次数据（1）会被抛弃，2会被保留
                Thread.sleep(1500);  // 因为2和3之间的间隔大于指定时间1s，所以之前被保留的2事件将发出
                e.onNext(3);
                Thread.sleep(1500);  // 因为3和4之间的间隔大于指定时间1s，所以3事件将发出
                e.onNext(4);
                Thread.sleep(500); // 因为4和5之间的间隔小于指定时间1s，所以前1次数据（4）会被抛弃，5会被保留
                e.onNext(5);
                Thread.sleep(500); // 因为5和6之间的间隔小于指定时间1s，所以前1次数据（5）会被抛弃，6会被保留
                e.onNext(6);
                Thread.sleep(1500); // 因为6和Complete实践之间的间隔大于指定时间1s，所以之前被保留的6事件将发出

                e.onComplete();
            }
        }).debounce(1, TimeUnit.SECONDS).subscribe(observer);
    };

    //仅选取第1个元素
    public void firstElement(){
        Observable.just(1, 2, 3, 4, 5).firstElement().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("=accept=","接收到了事件" + integer);
            }
        });
    };

    //最后一个元素
    public void lastElement(){
        Observable.just(1, 2, 3, 4, 5).lastElement().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("=accept=","接收到了事件" + integer);
            }
        });
    };

    //取元素下标
    public void elementAt(){
        // 使用1：获取位置索引 = 2的 元素
        // 位置索引从0开始
        Observable.just(1, 2, 3, 4, 5).elementAt(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("=accept=","接收到了事件" + integer);
            }
        });

        // 使用2：获取的位置索引 ＞ 发送事件序列长度时，设置默认参数
        Observable.just(1, 2, 3, 4, 5).elementAt(6,10).subscribe(new Consumer<Integer>() {
            @Override
            public void accept( Integer integer) throws Exception {
                Log.e("=accept=","接收到了事件" + integer);
            }
        });
    };

    //在elementAt（）的基础上，当出现越界情况（即获取的位置索引 ＞ 发送事件序列长度）时，即抛出异常
    public void elementAtOrError(){
        Observable.just(1, 2, 3, 4, 5).elementAtOrError(6).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("=accept=", "接收到了事件" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {

            }
        });
    };

    //组合多个被观察者一起发送数据，合并后 按发送顺序串行执行 concat（）组合被观察者数量≤4个
    public void concat(){
        Observable.concat( Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)).subscribe(observerLong);
    };

    //而concatArray（）则可＞4个
    public void concatArray(){
        Observable.concatArray(Observable.just(1),Observable.just(2),Observable.just(3),Observable.just(4),Observable.just(5)).subscribe(observer);
    };

    //组合多个被观察者一起发送数据，合并后 按时间线并行执行 merge（）组合被观察者数量≤4个 多线程并发
    //Todo 组合多个被观察者一起发送数据，合并后 按时间线并行执行
    public void merge(){
        Observable.merge(Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)).subscribe(observerLong);
    };

    //mergeArray（）则可＞4个
    public void mergeArray(){
        Observable.mergeArray(Observable.just(1),Observable.just(2),Observable.just(3),Observable.just(4),Observable.just(5)).subscribe(observer);
    };

    //Todo 相比与concat 出现error后中断 此处可以继续执行
    public void concatDelayError(){
        Observable.concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onError(new NullPointerException()); // 发送Error事件，因为使用了concatDelayError，所以第2个Observable将会发送事件，等发送完毕后，再发送错误事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6)).subscribe(observer);
    };

    //Todo 相比与merge 出现error后中断 此处可以继续执行
    public void mergeDelayError(){
        Observable.mergeDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onError(new NullPointerException()); // 发送Error事件，因为使用了concatDelayError，所以第2个Observable将会发送事件，等发送完毕后，再发送错误事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6)).subscribe(observer);
    };

    //合并 多个被观察者（Observable）发送的事件，生成一个新的事件序列（即组合过后的事件序列），并最终发送
    //TODO 与concat merge 合并相比 zip 生产一个新的合并后的对象
    public void zip(){
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                // 为了方便展示效果，所以在发送事件后加入2s的延迟
                Thread.sleep(1000);
                emitter.onNext(2);
                Thread.sleep(1000);
                emitter.onNext(3);
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()); // 设置被观察者1在工作线程1中工作

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
                Thread.sleep(1000);
                emitter.onNext("B");
                Thread.sleep(1000);
                emitter.onNext("C");
                Thread.sleep(1000);
                emitter.onNext("D");
                Thread.sleep(1000);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());// 设置被观察者2在工作线程2中工作
        // 假设不作线程控制，则该两个被观察者会在同一个线程中工作，即发送事件存在先后顺序，而不是同时发送

        // 注：创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                return  integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("=onSubscribe=", "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.e("=onNext=", "最终接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("=onError=", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("=onComplete=", "onComplete");
            }
        });
    };

    //当两个Observables中的任何一个发送了数据后，将先发送了数据的Observables 的最新（最后）一个数据 与 另外一个Observable发送的每个数据结合，最终基于该函数的结果发送数据
    //Todo 与zip相比合并最新Observables
    public void combineLatest(){
        Observable.combineLatest(Observable.just(1L, 2L, 3L), Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() {
            @NonNull
            @Override
            public Long apply(@NonNull Long aLong, @NonNull Long aLong2) throws Exception {
                Log.e("=apply=", "合并的数据是： "+ aLong + " "+ aLong2);
                return aLong + aLong2;
            }
        }).subscribe(observerLong);
    };

    //出现错误后不会终止
    public void combineLatestDelayError(){

    };

    //把被观察者需要发送的事件聚合成1个事件 & 发送
    public void reduce(){
        Observable.just(1,2,3,4).reduce(new BiFunction<Integer, Integer, Integer>() {
            @NonNull
            @Override
            public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                return integer * integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("=apply=", "最终计算的结果是： "+integer);
            }
        });
    };

    //将被观察者Observable发送的数据事件收集到一个数据结构里
    public void collect(){
   /*     Observable.just(1,2,3,4).collect(new Callable<ArrayList<Integer>>() {
            @Override
            public ArrayList<Integer> call() throws Exception {
                return new ArrayList<>();
            }
        }, new BiConsumer<ArrayList<Integer>, Integer>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> integers, @NonNull Integer integer) throws Exception {
                integers.add(integer);
            }
        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> integers) throws Exception {
                Log.e("=accept=","本次发送的数据是： "+integers);
            }
        });*/

    };

    //在一个被观察者发送事件前，追加发送一个数据 / 一个新的被观察者
    public void startWith(){
//        Observable.just(2,3,4).startWith(1).subscribe(observer);
    };

    //在一个被观察者发送事件前，追加发送一组数据 / 一个新的被观察者
    public void startWithArray(){
        Observable.just(2,3,4).startWithArray(0,1).subscribe(observer);
    };

    //统计被观察者发送事件的数量
    public void count(){
        Observable.just(1,2,3,4).count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                Log.e("=accept=", "发送的事件数量 =  "+aLong);
            }
        });
    };

    //订阅，即连接观察者 & 被观察者
    public void subscribe(){
        //1. 创建被观察者 & 生产事件
        //2. 通过通过订阅（subscribe）连接观察者和被观察者
        //3. 创建观察者 & 定义响应事件的行为
        Observable.just(1).subscribe(observer);
    };

    //使得被观察者延迟一段时间再发送事件
    // 1. 指定延迟时间
    // 参数1 = 时间；参数2 = 时间单位
    //delay(long delay,TimeUnit unit)
    // 2. 指定延迟时间 & 调度器
    // 参数1 = 时间；参数2 = 时间单位；参数3 = 线程调度器
    //delay(long delay,TimeUnit unit,mScheduler scheduler)
    // 3. 指定延迟时间  & 错误延迟
    // 错误延迟，即：若存在Error事件，则如常执行，执行后再抛出错误异常
    // 参数1 = 时间；参数2 = 时间单位；参数3 = 错误延迟参数
    //delay(long delay,TimeUnit unit,boolean delayError)
    // 4. 指定延迟时间 & 调度器 & 错误延迟
    // 参数1 = 时间；参数2 = 时间单位；参数3 = 线程调度器；参数4 = 错误延迟参数
    //delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器，错误通知可以设置是否延迟
    public void delay(){
        Observable.just(1).delay(1,TimeUnit.SECONDS).subscribe(observer);
    };

    //在某个事件的生命周期中调用
    public void dox(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new Throwable("发生错误了"));
            }
        })
                // 1. 当Observable每发送1次数据事件就会调用1次
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.e("=accept=", "doOnEach: " + integerNotification.getValue());
                    }
                })
                // 2. 执行Next事件前调用
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("=accept=", "doOnNext: " + integer);
                    }
                })
                // 3. 执行Next事件后调用
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("=accept=", "doAfterNext: " + integer);
                    }
                })
                // 4. Observable正常发送事件完毕后调用
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e("=accept=", "doOnComplete: ");
                    }
                })
                // 5. Observable发送错误事件时调用
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e("=accept=", "doOnError: " + throwable.getMessage());
                    }
                })
                // 6. 观察者订阅时调用
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        Log.e("=accept=", "doOnSubscribe: ");
                    }
                })
                // 7. Observable发送事件完毕后调用，无论正常发送完毕 / 异常终止
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e("=accept=", "doAfterTerminate: ");
                    }
                })
                // 8. 最后执行
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e("=accept=", "doFinally: ");
                    }
                })
                .subscribe(observer);
    };

    //遇到错误时，发送1个特殊事件 & 正常终止 并填充兜底
    public void onErrorReturn(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("发生错误了"));
            }
        })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(@NonNull Throwable throwable) throws Exception {
                        // 捕捉错误异常
                        Log.e("=apply=", "在onErrorReturn处理了错误: "+throwable.toString() );

                        return 666;
                        // 发生错误事件后，发送一个"666"事件，最终正常结束
                    }
                })
                .subscribe(observer);
    };

    //遇到错误时，发送1个新的Observable 兜底
    public void onErrorResumeNext(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("发生错误了"));
            }
        })
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                    @Override
                    public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {

                        // 1. 捕捉错误异常
                        Log.e("=apply=","在onErrorReturn处理了错误: "+throwable.toString() );

                        // 2. 发生错误事件后，发送一个新的被观察者 & 发送事件序列
                        return Observable.just(11,22);

                    }
                })
                .subscribe(observer);
    };

    //遇到错误时，发送1个新的Observable
    //todo onExceptionResumeNext（）拦截的错误 = Exception；若需拦截Throwable请用onErrorResumeNext（）
    //todo 若onExceptionResumeNext（）拦截的错误 = Throwable，则会将错误传递给观察者的onError方法
    public void onExceptionResumeNext(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
            }
        })
        /*        .onExceptionResumeNext(new Observable<Integer>() {
                    @Override
                    protected void subscribeActual(Observer<? super Integer> observer) {
                        observer.onNext(11);
                        observer.onNext(22);
                        observer.onComplete();
                    }
                })*/
                .subscribe(observer);
    };

    //重试，即当出现错误时，让被观察者（Observable）重新发射数据
    //<-- 1. retry（） -->
    // 作用：出现错误时，让被观察者重新发送数据
    // 注：若一直错误，则一直重新发送
    //<-- 2. retry（long time） -->
    // 作用：出现错误时，让被观察者重新发送数据（具备重试次数限制
    // 参数 = 重试次数
    //<-- 3. retry（Predicate predicate） -->
    // 作用：出现错误后，判断是否需要重新发送数据（若需要重新发送& 持续遇到错误，则持续重试）
    // 参数 = 判断逻辑
    //<--  4. retry（new BiPredicate<Integer, Throwable>） -->
    // 作用：出现错误后，判断是否需要重新发送数据（若需要重新发送 & 持续遇到错误，则持续重试
    // 参数 =  判断逻辑（传入当前重试次数 & 异常错误信息）
    //<-- 5. retry（long time,Predicate predicate） -->
    // 作用：出现错误后，判断是否需要重新发送数据（具备重试次数限制
    // 参数 = 设置重试次数 & 判断逻辑
    public void retry(){
        // 作用：出现错误时，让被观察者重新发送数据
        // 注：若一直错误，则一直重新发送
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                .retry() // 遇到错误时，让被观察者重新发射数据（若一直错误，则一直重新发送
                .subscribe(observer);
        //<-- 2. retry（long time） -->
        // 作用：出现错误时，让被观察者重新发送数据（具备重试次数限制
        // 参数 = 重试次数
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                .retry(3) // 设置重试次数 = 3次
                .subscribe(observer);

        //<-- 3. retry（Predicate predicate） -->
        // 作用：出现错误后，判断是否需要重新发送数据（若需要重新发送& 持续遇到错误，则持续重试）
        // 参数 = 判断逻辑
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                // 拦截错误后，判断是否需要重新发送请求
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕获异常
                        Log.e("=test=", "retry错误: "+throwable.toString());

                        //返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        //返回true = 重新发送请求（若持续遇到错误，就持续重新发送）
                        return true;
                    }
                })
                .subscribe(observer);

        //<--  4. retry（new BiPredicate<Integer, Throwable>） -->
        // 作用：出现错误后，判断是否需要重新发送数据（若需要重新发送 & 持续遇到错误，则持续重试
        // 参数 =  判断逻辑（传入当前重试次数 & 异常错误信息）
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                // 拦截错误后，判断是否需要重新发送请求
                .retry(new BiPredicate<Integer, Throwable>() {
                    @Override
                    public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
                        // 捕获异常
                        Log.e("=test=", "异常错误 =  "+throwable.toString());

                        // 获取当前重试次数
                        Log.e("=test=", "当前重试次数 =  "+integer);

                        //返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        //返回true = 重新发送请求（若持续遇到错误，就持续重新发送）
                        return true;
                    }
                })
                .subscribe(observer);

        //<-- 5. retry（long time,Predicate predicate） -->
        // 作用：出现错误后，判断是否需要重新发送数据（具备重试次数限制
        // 参数 = 设置重试次数 & 判断逻辑
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                // 拦截错误后，判断是否需要重新发送请求
                .retry(3, new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕获异常
                        Log.e("=test=", "retry错误: "+throwable.toString());

                        //返回false = 不重新重新发送数据 & 调用观察者的onError（）结束
                        //返回true = 重新发送请求（最多重新发送3次）
                        return true;
                    }
                })
                .subscribe(observer);
    };

    //出现错误后，判断是否需要重新发送数据
    //Todo 作用类似于retry（Predicate predicate） 唯一区别：返回 true 则不重新发送数据事件。此处不作过多描述
    public void retryUntil(){
        // 作用：出现错误后，判断是否需要重新发送数据（若需要重新发送& 持续遇到错误，则持续重试）
        // 参数 = 判断逻辑
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                // 拦截错误后，判断是否需要重新发送请求
                .retryUntil(new BooleanSupplier() {
                    @Override
                    public boolean getAsBoolean() throws Exception {
                        //返回true = 不重新重新发送数据 & 调用观察者的onError结束
                        //返回false = 重新发送请求（若持续遇到错误，就持续重新发送）
                        return false;
                    }
                })
                .subscribe(observer);
    };
    //遇到错误时，将发生的错误传递给一个新的被观察者（Observable），并决定是否需要重新订阅原始被观察者（Observable）& 发送事件
    public void retryWhen(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                // 遇到error事件才会回调
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @NonNull
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                        // 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常，可通过该条件来判断异常的类型
                        // 返回Observable<?> = 新的被观察者 Observable（任意类型）
                        // 此处有两种情况：
                        // 1. 若 新的被观察者 Observable发送的事件 = Error事件，那么 原始Observable则不重新发送事件：
                        // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ，那么原始的Observable则重新发送事件：
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

                                // 1. 若返回的Observable发送的事件 = Error事件，则原始的Observable不重新发送事件
                                // 该异常错误信息可在观察者中的onError（）中获得
                                return Observable.error(new Throwable("retryWhen终止啦"));

                                // 2. 若返回的Observable发送的事件 = Next事件，则原始的Observable重新发送事件（若持续遇到错误，则持续重试）
                                // return Observable.just(1);
                            }
                        });

                    }
                }).subscribe(observer);
    };

    //重复不断地发送被观察者事件
    // 不传入参数 = 重复发送次数 = 无限次
    //repeat（）；
    // 传入参数 = 重复发送次数有限
    //repeatWhen（Integer int ）；
    // 注：
    // 1. 接收到.onCompleted()事件后，触发重新订阅 & 发送
    // 2. 默认运行在一个新的线程上
    public void repeat(){
        // 重复创建次数 = 2次
        Observable.just(1).repeat(2).subscribe(observer);
    };

    //有条件地、重复发送 被观察者事件
    //将原始 Observable 停止发送事件的标识（Complete（） / Error（））转换成1个 Object 类型数据传递给1个新被观察者（Observable），以此决定是否重新订阅 & 发送原来的 Observable
    //TODO 若新被观察者（Observable）返回1个Complete / Error事件，则不重新订阅 & 发送原来的 Observable
    //TODO 若新被观察者（Observable）返回其余事件时，则重新订阅 & 发送原来的 Observable
    public void repeatWhen(){
        Observable.just(1).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @NonNull
            @Override
            public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                // 将原始 Observable 停止发送事件的标识（Complete（） /  Error（））转换成1个 Object 类型数据传递给1个新被观察者（Observable）
                // 以此决定是否重新订阅 & 发送原来的 Observable
                // 此处有2种情况：
                // 1. 若新被观察者（Observable）返回1个Complete（） /  Error（）事件，则不重新订阅 & 发送原来的 Observable
                // 2. 若新被观察者（Observable）返回其余事件，则重新订阅 & 发送原来的 Observable
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

                        // 情况1：若新被观察者（Observable）返回1个Complete（） /  Error（）事件，则不重新订阅 & 发送原来的 Observable
                        return Observable.empty();
                        // Observable.empty() = 发送Complete事件，但不会回调观察者的onComplete（）

                        // return Observable.error(new Throwable("不再重新订阅事件"));
                        // 返回Error事件 = 回调onError（）事件，并接收传过去的错误信息。

                        // 情况2：若新被观察者（Observable）返回其余事件，则重新订阅 & 发送原来的 Observable
                        // return Observable.just(1);
                        // 仅仅是作为1个触发重新订阅被观察者的通知，发送的是什么数据并不重要，只要不是Complete（） /  Error（）事件
                    }
                });
            }
        }).subscribe(observer);

    };

    //被观察者 （Observable） 在 子线程 中生产事件（如实现耗时操作等等）
    //观察者（Observer）在 主线程 接收 & 响应事件（即实现UI操作）
    //Schedulers.immediate()	当前线程 = 不指定线程	默认
    //AndroidSchedulers.mainThread()	Android主线程	操作UI
    //Schedulers.newThread()	常规新线程	耗时等操作
    //Schedulers.io()	io操作线程	网络请求、读写文件等io密集型操作
    //Schedulers.computation()	CPU计算操作线程	大量计算操作
    //Todo 若Observable.subscribeOn（）多次指定被观察者 生产事件的线程，则只有第一次指定有效，其余的指定线程无效
    //RxJava 执行顺序自下而上
    public void subscribeOn(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.e("=subscribe=",Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        Log.e("=test=",Thread.currentThread().getName());
                        return integer < 2;
                    }
                })
                //如果不切换都是主线程 只有第一次指定有效，其余的指定线程无效
                .subscribeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("=onSubscribe=",Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("=onNext=",Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    };
    //Todo 若Observable.observeOn（）多次指定观察者 接收 & 响应事件的线程，则每次指定均有效，即每指定一次，就会进行一次线程的切换
    public void  observeOn(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.e("=subscribe=",Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .subscribeOn(Schedulers.io())
                //即每指定一次，就会进行一次线程的切换
                .observeOn(AndroidSchedulers.mainThread())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        Log.e("=test=",Thread.currentThread().getName());
                        return integer < 2;
                    }
                })
                //即每指定一次，就会进行一次线程的切换
                .observeOn(Schedulers.io())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("=onSubscribe=",Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("=onNext=",Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    };
}